{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# More efficient data movement with MPI" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Just like [we did](memmap%20Broadcast.ipynb) manually with memmap,\n", "you can move data more efficiently with MPI by sending it to just one engine,\n", "and using MPI to broadcast it to the rest of the engines.\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import socket\n", "import os, sys, re\n", "\n", "import numpy as np\n", "\n", "import ipyparallel as ipp" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For this demo, I will connect to a cluster with engines started with MPI.\n", "\n", "One way to do so would be:\n", "\n", " ipcluster start -n 64 --engines=MPI --profile mpi\n", " \n", "In this directory is a docker-compose file to simulate multiple engine sets launched with MPI.\n", "\n", "I ran this example with a cluster on a 64-core remote VM,\n", "so communication between the client and controller is over the public internet,\n", "while communication between the controller and engines is local." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "rc = ipp.Client(profile=\"mpi\")\n", "rc.wait_for_engines(64)\n", "eall = rc.broadcast_view(coalescing=True)\n", "root = rc[0]" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "64" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(rc)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "root['a'] = 5" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "%px from mpi4py.MPI import COMM_WORLD as MPI" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We cn get a mapping of IPP rank to MPI rank, in case they mismatch.\n", "\n", "In recent-enough IPython Parallel,\n", "they usually don't because IPython engines request their MPI rank as their engine id." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{0: 0,\n", " 1: 1,\n", " 2: 2,\n", " 3: 3,\n", " 4: 4,\n", " 5: 5,\n", " 6: 6,\n", " 7: 7,\n", " 8: 8,\n", " 9: 9,\n", " 10: 10,\n", " 11: 11,\n", " 12: 12,\n", " 13: 13,\n", " 14: 14,\n", " 15: 15,\n", " 16: 16,\n", " 17: 17,\n", " 18: 18,\n", " 19: 19,\n", " 20: 20,\n", " 21: 21,\n", " 22: 22,\n", " 23: 23,\n", " 24: 24,\n", " 25: 25,\n", " 26: 26,\n", " 27: 27,\n", " 28: 28,\n", " 29: 29,\n", " 30: 30,\n", " 31: 31,\n", " 32: 32,\n", " 33: 33,\n", " 34: 34,\n", " 35: 35,\n", " 36: 36,\n", " 37: 37,\n", " 38: 38,\n", " 39: 39,\n", " 40: 40,\n", " 41: 41,\n", " 42: 42,\n", " 43: 43,\n", " 44: 44,\n", " 45: 45,\n", " 46: 46,\n", " 47: 47,\n", " 48: 48,\n", " 49: 49,\n", " 50: 50,\n", " 51: 51,\n", " 52: 52,\n", " 53: 53,\n", " 54: 54,\n", " 55: 55,\n", " 56: 56,\n", " 57: 57,\n", " 58: 58,\n", " 59: 59,\n", " 60: 60,\n", " 61: 61,\n", " 62: 62,\n", " 63: 63}" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "mpi_ranks = eall.apply_async(lambda : MPI.Get_rank()).get_dict()\n", "root_rank = root.apply_sync(lambda : MPI.Get_rank())\n", "mpi_ranks" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "32" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sz = 512\n", "data = np.random.random((sz, sz))\n", "megabytes = data.nbytes // (1024 * 1024)\n", "megabytes" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "e3943864933941bbb65e6ad17e3c589f", "version_major": 2, "version_minor": 0 }, "text/plain": [ "_push: 0%| | 0/64 [00:00